// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= MQTT Streamer

== Overview

This streamer consumes from an MQTT topic and feeds key-value pairs into an `IgniteDataStreamer` instance, using
https://eclipse.org/paho/[Eclipse Paho, window=_blank] as an MQTT client.

You need to provide a stream tuple extractor (either a single-entry or multiple-entries extractor) to process the incoming
message and extract the tuple to insert.

This streamer supports:

* Subscribing to a single topic or multiple topics at once.
* Specifying the subscriber's QoS for a single topic or for multiple topics.
* Setting https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttConnectOptions.html[MqttConnectOptions, window=_blank]
to enable features like _last will testament_, _persistent sessions_, etc.
* Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user does not provide one.
* (Re-)Connection retries powered by the https://github.com/rholder/guava-retrying[guava-retrying library, window=_blank].
_Retry wait_ and _retry stop_ policies can be configured.
* Blocking the start() method until the client is connected for the first time.

== Example

Here's a trivial code sample showing how to use this streamer:

[tabs]
--
tab:Java[]
[source,java]
----
// Start Ignite.
Ignite ignite = Ignition.start();

// Get a data streamer reference.
IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer("mycache");

// Create an MQTT data streamer
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setBrokerUrl(brokerUrl);
streamer.setBlockUntilConnected(true);

// Set a single tuple extractor to extract items in the format 'key,value' where key => Int, and value => String
// (using Guava here).
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
    @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
        List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));

        return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
    }
});

// Consume from multiple topics at once.
streamer.setTopics(Arrays.asList("def", "ghi", "jkl", "mno"));

// Start the MQTT Streamer.
streamer.start();
----
--

Refer to the Javadocs of the `ignite-mqtt-ext` module for more info on the available options.
